-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix read from multiple s3 regions #1453
Fix read from multiple s3 regions #1453
Conversation
Hey @kevinjqliu , I hope you are ready for the Christmas time :-) After some investigation, I noticed the Would be keen to get some quick feedback, and will add more unit test if this sounds a fix on the correct track? Thanks! |
@jiakai-li Thanks for working on this! And happy holidays :)
looking through the usage for
I think that's one of the problems we need to tackle. The current S3 configuration requires a specific "region" to be set. This assumes that all data and metadata files are from the same region as the one specified. But what if i have some files in one region and some in another? I think a potential solution might be to omit the "region" property and allow the S3FileSystem to determine the proper region using Another potential issue is the way we cache fs, it assumes that there's only one fs per scheme. With the region approach above, we break this assumption. |
BTW theres a similar issue in #1041 |
Thank you @kevinjqliu , just try to clear my head a little bit
Is the change I made in accordance with this option? What I've done essentially is using the
Please correct me if I miss something for how the fs cache works. But here is my understanding: I see we use I think solving the |
Can I tackle on this issue as well if there is no one working on it? |
Im dont think iceberg-python/pyiceberg/io/pyarrow.py Lines 434 to 436 in dbcf65b
and running an example S3 URI:
In order to support multiple regions, we might need to call BTW a good test scenario can be a table where my metadata files are stored in one bucket while my data files are stored in another. We might be able to construct this test case by modifying the |
I don't think anyone's working on it right now, feel free to pick it up. |
Thank you @kevinjqliu , can I have some more guidance on this please?
I did some search and seems in terms of s3 scheme, the format is In the below example, I would expect 'a' to be
Yep, I tested the change using a similar scenario locally with my own handcrafted s3 files. But will add more proper test cases as I make more progress. Thanks again! |
ah yes, you're right. sorry for the confusion. I was thinking of something else. |
BTW there are 2 FileIO implementations, one for pyarrow, another for fsspec. We might want to do the same for fsspec iceberg-python/pyiceberg/io/fsspec.py Lines 133 to 141 in dbcf65b
|
Sweet, I'll go ahead with this approach then. Thanks very much @kevinjqliu ! |
Hi @kevinjqliu , for the above concern, I tested it locally and also did some investigation. According to what I found here seems fsspec doesn't have the same issue as pyarrow. So I guess we can leave it? |
wow thats interesting, i didn't know about that. I like that solution :) Hopefully pyarrow fs will have this feature one day
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some more comments, thanks for working on this!
Co-authored-by: Kevin Liu <[email protected]>
pyiceberg/io/pyarrow.py
Outdated
@@ -1508,7 +1512,7 @@ def _record_batches_from_scan_tasks_and_deletes( | |||
if self._limit is not None and total_row_count >= self._limit: | |||
break | |||
batches = _task_to_record_batches( | |||
self._fs, | |||
_fs_from_file_path(task.file.file_path, self._io), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! I think this solves #1041 as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it is :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I noticed that we first pass in the path here, and the IO as a second argument. For _read_all_delete_files
it is the other way around. How about unifying this?
This PR is ready for review now. Thanks very much and merry christmas! Please let me know if any further change is required. |
pyiceberg/io/pyarrow.py
Outdated
@@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste | |||
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), | |||
} | |||
|
|||
# Override the default s3.region if netloc(bucket) resolves to a different region | |||
try: | |||
client_kwargs["region"] = resolve_s3_region(netloc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about doing this lookup only when the region is not provided explicitly? I think this will do another call to S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Fokko, my understanding is that the problem occurs when the provided region
doesn't match the data file bucket region, and that will fail the file read for pyarrow. And by overwriting the bucket region (fall back to provided region), we make sure the real bucket region that a data file is stored takes precedence. (this function is cached when using fs_by_scheme
, so it will be called only for new bucket that's not resolved previously to save calls to S3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are these 3 cases we're worried about:
# region match
region=us-east-1
s3://foo-us-east-1/
s3://bar-us-east-1/
# region mismatch
region=us-west-2
s3://foo-us-east-1/
s3://bar-us-west-2/
# region not provided
region=None
s3://foo-us-east-1/
s3://bar-us-west-2/
We have 2 options here
- use
region
when provided, fallback toresolve_s3_region
- always use
resolve_s3_region
resolve_s3_region
, fall back toregion
Option 1 is difficult since we dont know that the provided region
is wrong until we try to use the FileIO.
The code above uses option 2 which will always make an extra call to S3 to get the correct bucket region. This extra call to S3 is cached though, so resolve_s3_region
is only called once per bucket.
This is similar to the cache_regions
option for s3fs.core.S3FileSystem
I like option 3, we can resolve the bucket region and fallback to the provided region
. It might be confusing to the enduser when a region
is specified but the FileIO uses a different region, so lets add a warning for that.
Something like this
# Attempt to resolve the S3 region for the bucket, falling back to configured region if resolution fails
# Note, bucket resolution is cached and only called once per bucket
provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION)
try:
bucket_region = resolve_s3_region(bucket=netloc)
except (OSError, TypeError):
bucket_region = None
logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}")
if bucket_region and bucket_region != provided_region:
logger.warning(
f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: "
f"provided region {provided_region}, actual region {bucket_region}"
)
region = bucket_region or provided_region
client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": region,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for elaborating on this, I want to make sure that the user is aware of it, and I think we do that right with the warning.
For some additional context, for Java we don't have this issue because when you try to query the wrong region, the AWS SDK returns an HTTP 301 to the correct region. This introduces another 200 call but that's okay. The PyArrow implementation (that I believe uses the AWS C++ SDK underneath), just throws an error that it got a 301. We saw that in the past for example here: #515 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of comments, thanks for working on this!
pyiceberg/io/pyarrow.py
Outdated
if scheme in {"s3", "s3a", "s3n", "oss"}: | ||
from pyarrow.fs import S3FileSystem | ||
from pyarrow.fs import S3FileSystem, resolve_s3_region |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since oss
scheme uses this path, does it also support resolve_s3_region
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kevinjqliu . This is a really good catch. I didn't find too much information regarding support of oss regions by pyarrow.fs.resolve_s3_region
. But I tried it on my end and it doens't seem to work as it throws me an error complaining the bucket cannot be found.
This could be a problem though, especially if the same bucket name is used by both Aliyun and AWS. In which case the user-provided bucket region for Aliyun could be wrongly overwritten (by the resolved AWS one).
I separate the oss
path from s3
for now as I'm not sure if we want to tackle on the oss
now (and I feel we probably want to treat the two protocol differently?). I also break the _initialize_fs
code chunk into smaller blocks to make it a bit easier for future modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find too much information regarding support of oss regions by pyarrow.fs.resolve_s3_region. But I tried it on my end and it doens't seem to work as it throws me an error complaining the bucket cannot be found.
i dont think its supported, the underlying call is looking for x-amz-bucket-region
which i dont think aliyun will set
https://github.com/apache/arrow/blob/48d5151b87f1b8f977344c7ac20cb0810e46f733/cpp/src/arrow/filesystem/s3fs.cc#L660
This could be a problem though, especially if the same bucket name is used by both Aliyun and AWS. In which case the user-provided bucket region for Aliyun could be wrongly overwritten (by the resolved AWS one).
since we're using both scheme and bucket to cache FS, this should be fine right? For the case of oss://bucket
and s3://bucket
.
I separate the oss path from s3 for now as I'm not sure if we want to tackle on the oss now (and I feel we probably want to treat the two protocol differently?). I also break the _initialize_fs code chunk into smaller blocks to make it a bit easier for future modification.
yea lets just deal with s3 for now. Btw fsspec splits construct per fs, i think it looks pretty clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think its supported, the underlying call is looking for
x-amz-bucket-region
which i dont think aliyun will set
Thank you for checking that, I should have looked at it :-)
since we're using both scheme and bucket to cache FS, this should be fine right? For the case of
oss://bucket
ands3://bucket
Yes, there is no issue after the change now. What I was thinking is for the oss://bucket
scenario (ignore the caching behavior). If the bucket used by oss
also exists in AWS, then the previous version (before your comment) would try to resolve the bucket and use it to overwrite the defaul setting. This will be wrong though, because oss
bucket region cannot be resolved using pyarrow.
I updated the test case to take this into account and also added an integration test for multiple filesystem read.
pyiceberg/io/pyarrow.py
Outdated
@@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste | |||
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), | |||
} | |||
|
|||
# Override the default s3.region if netloc(bucket) resolves to a different region | |||
try: | |||
client_kwargs["region"] = resolve_s3_region(netloc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are these 3 cases we're worried about:
# region match
region=us-east-1
s3://foo-us-east-1/
s3://bar-us-east-1/
# region mismatch
region=us-west-2
s3://foo-us-east-1/
s3://bar-us-west-2/
# region not provided
region=None
s3://foo-us-east-1/
s3://bar-us-west-2/
We have 2 options here
- use
region
when provided, fallback toresolve_s3_region
- always use
resolve_s3_region
resolve_s3_region
, fall back toregion
Option 1 is difficult since we dont know that the provided region
is wrong until we try to use the FileIO.
The code above uses option 2 which will always make an extra call to S3 to get the correct bucket region. This extra call to S3 is cached though, so resolve_s3_region
is only called once per bucket.
This is similar to the cache_regions
option for s3fs.core.S3FileSystem
I like option 3, we can resolve the bucket region and fallback to the provided region
. It might be confusing to the enduser when a region
is specified but the FileIO uses a different region, so lets add a warning for that.
Something like this
# Attempt to resolve the S3 region for the bucket, falling back to configured region if resolution fails
# Note, bucket resolution is cached and only called once per bucket
provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION)
try:
bucket_region = resolve_s3_region(bucket=netloc)
except (OSError, TypeError):
bucket_region = None
logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}")
if bucket_region and bucket_region != provided_region:
logger.warning(
f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: "
f"provided region {provided_region}, actual region {bucket_region}"
)
region = bucket_region or provided_region
client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": region,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you for working on this
somethings going on with github runner, |
I saw that as well. Seems the |
#1485 to replace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for leaving this hanging. I wanted to do some local checks to ensure that the caching works properly. The Arrow FS is pretty bulky, and we had some issues with the caching in the past which caused some performance regression.
I left some small comments, but this looks good to me 👍
pyiceberg/io/pyarrow.py
Outdated
@@ -190,13 +190,6 @@ | |||
T = TypeVar("T") | |||
|
|||
|
|||
class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
People could rely on this for other things, I don't think we can just remove this without deprecation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert this change, I don't like the inline declaration of the class.
pyiceberg/io/pyarrow.py
Outdated
|
||
if proxy_uri := self.properties.get(S3_PROXY_URI): | ||
client_kwargs["proxy_options"] = proxy_uri | ||
elif scheme in ("hdfs", "viewfs"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be consistent here:
elif scheme in ("hdfs", "viewfs"): | |
elif scheme in {"hdfs", "viewfs"}: |
bucket_region = bucket_region or provided_region | ||
if bucket_region != provided_region: | ||
logger.warning( | ||
f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " | ||
f"provided region {provided_region}, actual region {bucket_region}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this one, thanks!
pyiceberg/io/pyarrow.py
Outdated
if scheme in {"oss"}: | ||
return self._initialize_oss_fs(scheme, netloc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know if Alibaba doesn't support this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find an authoritive document explicily saying it's not supported from pyarrow. But I tested it locally and it doesn't work for Alibaba. Kevin also helped to check the pyarrow code in this comment. Seems pyarrow is using the x-amz-bucket-region
header to determine the bucket region, which seems to be an AWS thing.
deprecation_message( | ||
deprecated_in="0.8.0", | ||
removed_in="0.9.0", | ||
help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this one, while at it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Fokko, do you mean to remove the deprecation_message
or the GCS_ENDPOINT
property? It says this option will be removed in 0.9.0, is it ok if we remove it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove it now since the next release using the main
branch will be for 0.9.0
.
But id prefer to remove it in a separate PR since there's also references to GCS_ENDPOINT
in fsspec
https://grep.app/search?q=GCS_ENDPOINT&filter[repo][0]=apache/iceberg-python
pyiceberg/io/pyarrow.py
Outdated
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), | ||
} | ||
elif scheme in {"s3", "s3a", "s3n"}: | ||
return self._initialize_s3_fs(scheme, netloc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of the methods don't require all the parameters, for example, _initialize_s3_fs
does not use the scheme
. Should we remove those?
pyiceberg/io/pyarrow.py
Outdated
@@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste | |||
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), | |||
} | |||
|
|||
# Override the default s3.region if netloc(bucket) resolves to a different region | |||
try: | |||
client_kwargs["region"] = resolve_s3_region(netloc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for elaborating on this, I want to make sure that the user is aware of it, and I think we do that right with the warning.
For some additional context, for Java we don't have this issue because when you try to query the wrong region, the AWS SDK returns an HTTP 301 to the correct region. This introduces another 200 call but that's okay. The PyArrow implementation (that I believe uses the AWS C++ SDK underneath), just throws an error that it got a 301. We saw that in the past for example here: #515 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Looks like CI is passing now too.
deprecation_message( | ||
deprecated_in="0.8.0", | ||
removed_in="0.9.0", | ||
help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove it now since the next release using the main
branch will be for 0.9.0
.
But id prefer to remove it in a separate PR since there's also references to GCS_ENDPOINT
in fsspec
https://grep.app/search?q=GCS_ENDPOINT&filter[repo][0]=apache/iceberg-python
Co-authored-by: Kevin Liu <[email protected]>
Thank you @jiakai-li for the contribution and @Fokko for the review :) |
This PR closes: